package com.deep.flink.sqlserver.job;


import com.deep.flink.sqlserver.bean.CommonBean;
import com.deep.flink.sqlserver.config.CollectionConfig;
import com.deep.flink.sqlserver.map.CommonBeanMap;
import com.deep.flink.sqlserver.task.TBoaContractDocTask;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.configuration.Configuration;

import java.util.Properties;

/**
 * @ClassName CollectionStream
 * @Author deeprado
 * @Version 1.0.0
 * @Description 主类，处理流程
 * @Date 2023/5/4 13:09
 */
public class CollectionStream {

    public static void main(String[] args) throws Exception {
        Properties cdcProperties = new Properties();
//        cdcProperties.setProperty("snapshot.mode", "schema_only"); // 增量读取
        /**
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         */
        cdcProperties.setProperty("bigint.unsigned.handling.mode","long");
        cdcProperties.setProperty("decimal.handling.mode","string");
//        cdcProperties.setProperty("decimal.handling.mode","double");
        DebeziumSourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
                .hostname(CollectionConfig.config.getProperty("sqlserver.hostname"))
                .port(Integer.parseInt(CollectionConfig.config.getProperty("sqlserver.port")))
                .database(CollectionConfig.config.getProperty("sqlserver.database"))// monitor sqlserver database
                .tableList(CollectionConfig.config.getProperty("sqlserver.tableList"))// monitor products table
                .username(CollectionConfig.config.getProperty("sqlserver.username"))
                .password(CollectionConfig.config.getProperty("sqlserver.password"))
                .debeziumProperties(cdcProperties)
                .deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial())
                .build();
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8081);
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //SqlServer 增量业务数据流
        DataStreamSource<String> streamSource =
                env
                .addSource(sourceFunction)
                        .setParallelism(1);// use parallelism 1 for sink to keep message ordering;
        // 数据转换,转换成通用类
        DataStream<CommonBean> commonBeanStream = streamSource.map(new CommonBeanMap());
        // 业务数据处理，学生表
        //new StudentTask().process(commonBeanStream);
        // 工程合同数据
        new TBoaContractDocTask().process(commonBeanStream);
        // 触发执行
        env.execute();
    }
}
